{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\" \n",
    "     width=\"30%\" \n",
    "     align=right\n",
    "     alt=\"Dask logo\">\n",
    "\n",
    "# Embarrassingly parallel Workloads\n",
    "\n",
    "This notebook shows how to use Dask to parallelize embarrassingly parallel workloads where you want to apply one function to many pieces of data independently.  It will show three different ways of doing this with Dask:\n",
    "\n",
    "1. [dask.delayed](http://dask.pydata.org/en/latest/delayed.html) \n",
    "2. [concurrent.Futures](https://dask.pydata.org/en/latest/futures.html) \n",
    "3. [dask.bag](https://dask.pydata.org/en/latest/bag.html)\n",
    "\n",
    "This example focuses on using Dask for building large embarrassingly parallel computation as often seen in scientific communities and on High Performance Computing facilities, for example with Monte Carlo methods. This kind of simulation assume the following:\n",
    "\n",
    " - We have a function that runs a heavy computation given some parameters.\n",
    " - We need to compute this function on many different input parameters, each function call being independent.\n",
    " - We want to gather all the results in one place for further analysis."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start Dask Client for Dashboard\n",
    "\n",
    "Starting the Dask Client will provide a dashboard which \n",
    "is useful to gain insight on the computation.  We will also need it for the\n",
    "Futures API part of this example. Moreover, as this kind of computation\n",
    "is often launched on super computer or in the Cloud, you will probably end\n",
    "up having to start a cluster and connect a client to scale.  See \n",
    "[dask-jobqueue](https://github.com/dask/dask-jobqueue),\n",
    "[dask-kubernetes](https://github.com/dask/dask-kubernetes) or \n",
    "[dask-yarn](https://github.com/dask/dask-yarn) for easy ways to achieve this\n",
    "on respectively an HPC, Cloud or Big Data infrastructure.\n",
    "\n",
    "The link to the dashboard will become visible when you create the client below.  We recommend having it open on one side of your screen while using your notebook on the other side.  This can take some effort to arrange your windows, but seeing them both at the same time is very useful when learning."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client, progress\n",
    "client = Client(threads_per_worker=4, n_workers=1)\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define your computation calling function\n",
    "\n",
    "This function does a simple operation: add all numbers of a list/array together, but it also sleeps for a random amount of time to simulate real work. In real use cases, this could call another python module, or even run an executable using subprocess module."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "import random\n",
    "\n",
    "def costly_simulation(list_param):\n",
    "    time.sleep(random.random())\n",
    "    return sum(list_param)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We try it locally below"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%time costly_simulation([1, 2, 3, 4])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define the set of input parameters to call the function\n",
    "\n",
    "We will generate a set of inputs on which we want to run our simulation function. Here we use Pandas dataframe, but we could also use a simple list. Lets say that our simulation is run with four parameters called param_[a-d]."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import numpy as np\n",
    "\n",
    "input_params = pd.DataFrame(np.random.random(size=(500, 4)),\n",
    "                            columns=['param_a', 'param_b', 'param_c', 'param_d'])\n",
    "input_params.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Without using Dask, we could call our simulation on all of these parameters using normal Python for loops.\n",
    "\n",
    "Let's only do this on a sample of our parameters as it would be quite long otherwise."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "results = []"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "for parameters in input_params.values[:10]:\n",
    "    result = costly_simulation(parameters)\n",
    "    results.append(result)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "results"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that this is not very clever as we can easily parallelize code. \n",
    "\n",
    "There are many ways to parallelize this function in Python with libraries like `multiprocessing`, `concurrent.futures`, `joblib` or others.  These are good first steps.  Dask is a good second step, especially when you want to scale across many machines.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Use [Dask Delayed](http://dask.pydata.org/en/latest/delayed.html) to make our function lazy\n",
    "\n",
    "We can call `dask.delayed` on our funtion to make it lazy.  Rather than compute its result immediately, it records what we want to compute as a task into a graph that we'll run later on parallel hardware. Using `dask.delayed` is a relatively straightforward way to parallelize an existing code base, even if the computation isn't embarrassingly parallel like this one. \n",
    "\n",
    "Calling these lazy functions is now almost free.  In the cell below we only construct a simple graph."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask\n",
    "lazy_results = []"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "for parameters in input_params.values[:10]:\n",
    "    lazy_result = dask.delayed(costly_simulation)(parameters)\n",
    "    lazy_results.append(lazy_result)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lazy_results[0]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run in parallel\n",
    "\n",
    "The `lazy_results` list contains information about ten calls to `costly_simulation` that have not yet been run.  Call `.compute()` when you want your result as normal Python objects.\n",
    "\n",
    "If you started `Client()` above then you may want to watch the status page during computation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%time dask.compute(*lazy_results)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Notice that this was faster than running these same computations sequentially with a for loop.  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can now run this on all of our input parameters:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask\n",
    "lazy_results = []\n",
    "\n",
    "for parameters in input_params.values:\n",
    "    lazy_result = dask.delayed(costly_simulation)(parameters)\n",
    "    lazy_results.append(lazy_result)\n",
    "    \n",
    "futures = dask.persist(*lazy_results)  # trigger computation in the background"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To make this go faster, we can add additional workers.\n",
    "\n",
    "(although we're still only working on our local machine, this is more practical when using an actual cluster)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client.cluster.scale(10)  # ask for ten 4-thread workers"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "By looking at the Dask dashboard we can see that Dask spreads this work around our cluster, managing load balancing, dependencies, etc..\n",
    "\n",
    "Then get the result:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "results = dask.compute(*futures)\n",
    "results[:5]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Using the [Futures API](http://dask.pydata.org/en/latest/futures.html)\n",
    "\n",
    "The same example can be implemented using Dask's Futures API by using the `client` object itself.  For our use case of applying a function across many inputs both Dask delayed and Dask Futures are equally useful.  The Futures API is a little bit different because it starts work immediately rather than being completely lazy.\n",
    "\n",
    "For example, notice that work starts immediately in the cell below as we submit work to the cluster:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "futures = []\n",
    "for parameters in input_params.values:\n",
    "    future = client.submit(costly_simulation, parameters)\n",
    "    futures.append(future)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can explicitly wait until this work is done and gather the results to our local process by calling `client.gather`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "results = client.gather(futures)\n",
    "results[:5]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "But the code above can be run in fewer lines with `client.map()` function, allowing to call a given function on a list of parameters.\n",
    "\n",
    "As for delayed, we can only start the computation and not wait for results by not calling `client.gather()` right now.\n",
    "\n",
    "It shall be noted that as Dask cluster has already performed tasks launching `costly_simulation` with Futures API on the given input parameters, the call to `client.map()` won't actually trigger any computation, and just retrieve already computed results."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "futures = client.map(costly_simulation, input_params.values)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Then just get the results later:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "results = client.gather(futures)\n",
    "len(results)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(results[0])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We encourage you to watch the [dashboard's status page](../proxy/8787/status) to watch on going computation."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Doing some analysis on the results\n",
    "\n",
    "One of the interests of Dask here, outside from API simplicity, is that you are able to gather the result for all your simulations in one call.  There is no need to implement a complex mechanism or to write individual results in a shared file system or object store.\n",
    "\n",
    "Just get your result, and do some computation."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here, we will just get the results and expand our initial dataframe to have a nice view of parameters vs results for our computation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "output = input_params.copy()\n",
    "output['result'] = pd.Series(results, index=output.index)\n",
    "output.sample(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Then we can do some nice statistical plots or save result locally with pandas interface here"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%matplotlib inline\n",
    "output['result'].plot()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "output['result'].mean()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "filtered_output = output[output['result'] > 2]\n",
    "print(len(filtered_output))\n",
    "filtered_output.to_csv('/tmp/simulation_result.csv')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Handling very large simulation with [Bags](http://dask.pydata.org/en/latest/bag.html)\n",
    "\n",
    "The methods above work well for a size of input parameters up to about 100,000.  Above that, the Dask scheduler has trouble handling the amount of tasks to schedule to workers.  The solution to this problem is to bundle many parameters into a single task.\n",
    "You could do this either by making a new function that operated on a batch of parameters and using the delayed or futures APIs on that function.  You could also use the Dask Bag API.  This is described more in the documentation about [avoiding too many tasks](http://dask.pydata.org/en/latest/delayed-best-practices.html#avoid-too-many-tasks).\n",
    "\n",
    "Dask Bags hold onto large sequences in a few partitions.  We can convert our `input_params` sequence into a `dask.bag` collection, asking for fewer partitions (so at most 100,000, which is already huge), and apply our function on every item of the bag."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask.bag as db\n",
    "b = db.from_sequence(list(input_params.values), npartitions=100)\n",
    "b = b.map(costly_simulation)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%time results_bag = b.compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Looking on Dashboard here, you should see only 100 tasks to run instead of 500, each taking 5x more time in average, because each one is actually calling our function 5 times."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "np.all(results) == np.all(results_bag)"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
